package xiaoa.java.netty;

import java.net.URI;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;


import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.pool.ChannelPool;
import io.netty.channel.pool.FixedChannelPool;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.util.concurrent.Future;

/**
 * http连接池实现类
 * @author xiaoa
 * @date 2017年8月19日 上午12:49:56
 * @version V1.0
 *
 */
public class PoolHttpClient  extends SimpleHttpClient {
	
	/**
	 * 最大通道数量
	 */
	private int maxChannelNum = 15;
	
	/**
	 * 通道最大空闲时间
	 */
	private int maxChannelFreeTime = 5 * 60 * 1000 ;
	
	/**
	 * 每条通道最大连接数量
	 */
	private int maxConnections = 100;
	
	/**
	 * 每条通道最大空闲链接
	 */
	private int maxPendingAcquires = 40;
	
	/**
	 * 连接超时时间
	 */
	private int connTimeOut = 30 * 1000;

	/**
	 * 连接池map
	 */
	private Map<String, Map.Entry<ChannelPool,PoolState >>  poolMap = new HashMap<>();
	
	
	/**
	 * 连接池读写锁
	 */
	private Map<String, ReadWriteLock>  poolMapLock  = new HashMap<>();

	/**
	 *  创建一个锁
	 */
	private Lock    lock   =   new ReentrantLock();	
	
	
	/**
	 * 创建一个定时器  定时扫描不需要的线程
	 */
	public ScheduledExecutorService  scheduled = new ScheduledThreadPoolExecutor(1);
	
	
	/**
	 * 
	 * 构造器
	 * <p>Title: </p>
	 * <p>Description: </p>
	 * @author xiaoa
	 * @param maxChannelNum  最大通道数量
	 * @param maxChannelFreeTime  通道最大空闲时间
	 * @param maxConnections  每条通道最大连接数量
	 * @param maxPendingAcquires 每条通道最大空闲链接
	 * @param connTimeOut 创建链接超时时间
	 */
	public PoolHttpClient( int maxChannelNum , int maxChannelFreeTime , int maxConnections , int maxPendingAcquires , int connTimeOut){
		
		this();
		
		this.maxChannelNum = maxChannelNum;
		this.maxChannelFreeTime = maxChannelFreeTime;
		this.maxConnections = maxConnections;
		this.maxPendingAcquires = maxPendingAcquires;
		this.connTimeOut = connTimeOut;

	}
	
	public PoolHttpClient(){
		
		// 初始化定时器
		initRecoveryScheduled();
	}
	

	
	/**
	 * 初始化回收定时器
	 * @Title: initRecoveryScheduled
	 * @author xiaoa
	 */
	private void initRecoveryScheduled(){
		
		Runnable run = new Runnable() {
			
			@Override
			public void run() {
				
				
				try {
					
					// 当先连接池所有的key
					String [] keys = null;
					
					lock.lock();
					try {
						
						keys =  poolMap.keySet().toArray(new String[poolMap.size()]);
						
					} finally {
						lock.unlock();
					}
					
					
					if (keys == null ){
						
						if (Log.isDebug()){
							
							Log.debug(" poolMap isEmpty ");
							return ;
						}
					}
					
					// 如果设置了超时时间
					if (maxChannelFreeTime > 0){
						
						int num = 0;
						
						long startTime = System.currentTimeMillis();
						
						for (String key : keys){
							
							Map.Entry<ChannelPool,PoolState > poolE = poolMap.get(key);
							
							// 获取当前连接池状态对象
							PoolState  state = poolE.getValue();
							
							// 如果pool空闲时间超过了设置时间
							if (state.useNum == 0 && (System.currentTimeMillis() - state.lastTime > maxChannelFreeTime)){
								
								// 获取该连接池读写锁
								ReadWriteLock rwLock =   poolMapLock.get(key);
								
								rwLock.writeLock().lock();;
								try {
									
									// 关闭连接
									poolE.getKey().close();
									
									// 删除连接池子
									poolMap.remove(key);
									
									// 删除锁
									poolMapLock.remove(key);
									
								} finally{
									rwLock.writeLock().unlock();
								}
								
								num ++;
							}
							
						}
						
						
						if (Log.isDebug()){
							Log.debug(" recovery num [" + num + "]  current num [" + poolMap.size() + "]  useTime [" + (System.currentTimeMillis() - startTime) + "] " );
						}
						
					}else{
						
						if (Log.isDebug()){
							Log.debug(" no set maxChannelFreeTime Not execute recovery  current num [" + poolMap.size() + "]   " );
						}
						
					}
					
					
				} catch (Exception e) {

					e.printStackTrace();
				}
				
			}
		};
		
		
		
		// 设置1秒执行一次
		scheduled.scheduleAtFixedRate(run, 0L,1L,TimeUnit.SECONDS);
		
		
	}
	
	
	/**
	 * 获取连接池
	 * @Title: getChannelPool
	 * @param key
	 * @author xiaoa
	 */
	public Map.Entry<ChannelPool,PoolState > getChannelPool(String key){
		
		// 获取当前连接池读写锁
		ReadWriteLock  readL =  poolMapLock.get(key);
		
		if (readL == null){
			return null;
		}
		
		readL.readLock().lock();
		readL.readLock().unlock();

		// 获取连接池
		Map.Entry<ChannelPool,PoolState >  poolE = poolMap.get(key);
		
		if (poolE == null){
			return null;
		}
		
		return poolE;
		
	}
	
	

	/**
	 * 创建通道key
	 * @Title: createKey
	 * @param uri
	 * @return
	 * @author xiaoa
	 */
	private static String createKey(URI uri){
		
		int port = getPort(uri);
		
		// 拼接key
		String key = uri.getScheme() + "://" + uri.getHost() + ":" + port;
		
		return key;
	}
	
	
	/**
	 * 获取url连接池子
	 * @Title: getPool
	 * @return
	 * @author xiaoa
	 */
	private Map.Entry<ChannelPool,PoolState >   getPool(URI uri){
		
		if (uri == null || uri.toString().isEmpty()){
			throw new RuntimeException(" uri is null");
		}
	
		int port = getPort(uri);
		
		// 拼接key
		String key = createKey(uri);
		
		Map.Entry<ChannelPool,PoolState >   poolE =  getChannelPool(key);
		
		// 如果没有连接池，初始化连接池
		if (poolE == null){
			
			lock.lock();
			try {
				
	            poolE = getChannelPool(key);
				
				if (poolE == null){
					
					if (poolMap.size() >= maxChannelNum){
						throw new RuntimeException(" pool is full");
					}
					
					// 创建管理器
					Bootstrap boot = initBootstrap(connTimeOut);
					
					// 建立连接
					boot.remoteAddress(uri.getHost(), port);
					
					// 创建连接池
					ChannelPool  pool = new FixedChannelPool(boot, new SimpleChannelPoolHandler(), maxConnections , maxPendingAcquires);
					
					 // 添加到连接池map中
					PoolState  state = new PoolState();
					
					poolE = new AbstractMap.SimpleEntry<ChannelPool, PoolState>( pool , state);
					poolMap.put(key, poolE);
					poolMapLock.put(key, new ReentrantReadWriteLock());					 
				}
			}finally{ 
				lock.unlock();
			}
		}
		
		
		return poolE;
	}
	
	
	
	/**
	 * 获取一个链接
	 * @Title: acquire
	 * @param uri
	 * @return
	 * @author xiaoa
	 */
	private  Future<Channel>  acquire(URI uri) {
		
		 Map.Entry<ChannelPool,PoolState >   poolE =  getPool(uri);

		 // 获取一个url
		 Future<Channel> fu = poolE.getKey().acquire();
		 
		 if (fu == null ){
			throw new RuntimeException(" acquire is null");
		 }
		 // 拿一个url
		 poolE.getValue().take();
		 
		 return fu;
		
	}
	
	
	
    /**
     * 发起请求
     */
	@Override
	public HttpResponseResult doRequest(final HttpRequest request, final int socketTimeOut ,final Map<String, String> handerMap) throws Exception {
		
		
		if (request == null ){
			throw new RuntimeException(" request is null");
		}
		
		final URI uri =  new URI(request.uri());
		
		if (uri == null || uri.toString().isEmpty()){
			throw new RuntimeException(" uri is null");
		}
		
		// 获取一个连接
		Future<Channel> fu = acquire(uri);
		
	    return doPutReuqest(fu, request, socketTimeOut, handerMap);
	}
	
	@Override
	public void release(Channel channel, URI uri) throws Exception {
		
		String key = createKey(uri);
		
		if (poolMap.containsKey(key)){

			 // 删除连接超时handler
			 HttpHandlerUtils.doRemoveHanlder(HttpHandlerUtils.HTTP_SOCKET_TIMEOUT, channel.pipeline());
			 
			 // 删除请求handler对象
			 HttpHandlerUtils.doRemoveHanlder(HttpClient.RESULT_HANDLER, channel.pipeline());
			 
			 // 释放到链接池子
			 Map.Entry<ChannelPool,PoolState > pool = getPool(uri);
			 
			 pool.getKey().release(channel);
		     pool.getValue().release();
		     
		}
		
	}


	@Override
	void releaseAndClose(Channel channel, URI uri) throws Exception {
		String key = createKey(uri);

		if (poolMap.containsKey(key)){


			// 释放到链接池子
			Map.Entry<ChannelPool,PoolState > pool = getPool(uri);
			// 关闭通道
		    channel.close();

			pool.getKey().release(channel);
			pool.getValue().release();

			
		}
	}
	
	
	/**
	 * 连接池当前状态
	 * @author xiaoa
	 * @date 2017年8月20日 下午10:37:48
	 * @version V1.0
	 *
	 */
	private static class PoolState{
		
		/**
		 *  创建一个锁
		 */
		private Lock    lock   =   new ReentrantLock();	
		
		/**
		 * 最后一次操作时间
		 */
		public long lastTime = 0;
		
		/**
		 * 使用增量
		 */
		public int useNum = 0;
		
		
		/**
		 * 加
		 * @Title: add
		 * @author xiaoa
		 */
		public void take(){
			
			lock.lock();
			
			this.lastTime = System.currentTimeMillis();
			this.useNum ++;
			
			lock.unlock();
		}
		
		
		/**
		 * 减
		 * @Title: add
		 * @author xiaoa
		 */
		public void release(){
			
			lock.lock();
			
			this.lastTime = System.currentTimeMillis();
			this.useNum --;
			
			lock.unlock();
		}
		
		
		public  PoolState(){
			this.lastTime = System.currentTimeMillis();
			
		}
		
		
		
	}
	
	

}
